/*
 * This file Copyright (C) 2007-2014 Mnemosyne LLC
 *
 * It may be used under the GNU GPL versions 2 or 3
 * or any future license endorsed by Mnemosyne LLC.
 *
 */

#include <algorithm>
#include <cerrno>
#include <cstdarg>
#include <cstdlib>
#include <cstring>
#include <memory> // std::unique_ptr

#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <event2/event.h>

#include "transmission.h"
#include "cache.h"
#include "completion.h"
#include "file.h"
#include "log.h"
#include "peer-io.h"
#include "peer-mgr.h"
#include "peer-msgs.h"
#include "session.h"
#include "torrent.h"
#include "torrent-magnet.h"
#include "tr-assert.h"
#include "tr-dht.h"
#include "utils.h"
#include "variant.h"
#include "version.h"

#ifndef EBADMSG
#define EBADMSG EINVAL
#endif

/**
***
**/

enum
{
    BT_CHOKE = 0,
    BT_UNCHOKE = 1,
    BT_INTERESTED = 2,
    BT_NOT_INTERESTED = 3,
    BT_HAVE = 4,
    BT_BITFIELD = 5,
    BT_REQUEST = 6,
    BT_PIECE = 7,
    BT_CANCEL = 8,
    BT_PORT = 9,
    /* */
    BT_FEXT_SUGGEST = 13,
    BT_FEXT_HAVE_ALL = 14,
    BT_FEXT_HAVE_NONE = 15,
    BT_FEXT_REJECT = 16,
    BT_FEXT_ALLOWED_FAST = 17,
    /* */
    BT_LTEP = 20,
    /* */
    LTEP_HANDSHAKE = 0,
    /* */
    UT_PEX_ID = 1,
    UT_METADATA_ID = 3,
    /* */
    MIN_CHOKE_PERIOD_SEC = 10,
    /* idle seconds before we send a keepalive */
    KEEPALIVE_INTERVAL_SECS = 100,
    /* */
    PEX_INTERVAL_SECS = 90, /* sec between sendPex() calls */
    /* */
    REQQ = 512,
    /* */
    METADATA_REQQ = 64,
    /* */
    MAGIC_NUMBER = 21549,
    /* used in lowering the outMessages queue period */
    IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
    HIGH_PRIORITY_INTERVAL_SECS = 2,
    LOW_PRIORITY_INTERVAL_SECS = 10,
    /* number of pieces we'll allow in our fast set */
    MAX_FAST_SET_SIZE = 3,
    /* how many blocks to keep prefetched per peer */
    PREFETCH_SIZE = 18,
    /* when we're making requests from another peer,
       batch them together to send enough requests to
       meet our bandwidth goals for the next N seconds */
    REQUEST_BUF_SECS = 10,
    /* defined in BEP #9 */
    METADATA_MSG_TYPE_REQUEST = 0,
    METADATA_MSG_TYPE_DATA = 1,
    METADATA_MSG_TYPE_REJECT = 2
};

namespace
{

constexpr int MAX_PEX_PEER_COUNT = 50;

} // unnamed namespace

enum
{
    AWAITING_BT_LENGTH,
    AWAITING_BT_ID,
    AWAITING_BT_MESSAGE,
    AWAITING_BT_PIECE
};

enum encryption_preference_t
{
    ENCRYPTION_PREFERENCE_UNKNOWN,
    ENCRYPTION_PREFERENCE_YES,
    ENCRYPTION_PREFERENCE_NO
};

/**
***
**/

struct peer_request
{
    uint32_t index;
    uint32_t offset;
    uint32_t length;
};

static peer_request blockToReq(tr_torrent const* tor, tr_block_index_t block)
{
    auto ret = peer_request{};
    tr_torrentGetBlockLocation(tor, block, &ret.index, &ret.offset, &ret.length);
    return ret;
}

/**
***
**/

/* this is raw, unchanged data from the peer regarding
 * the current message that it's sending us. */
struct tr_incoming
{
    uint8_t id = 0;
    uint32_t length = 0; /* includes the +1 for id length */
    struct peer_request blockReq = {}; /* metadata for incoming blocks */
    struct evbuffer* block = nullptr; /* piece data for incoming blocks */
};

class tr_peerMsgsImpl;
// TODO: make these to be member functions
static ReadState canRead(tr_peerIo* io, void* vmsgs, size_t* piece);
static void cancelAllRequestsToClient(tr_peerMsgsImpl* msgs);
static void didWrite(tr_peerIo* io, size_t bytesWritten, bool wasPieceData, void* vmsgs);
static void gotError(tr_peerIo* io, short what, void* vmsgs);
static void peerPulse(void* vmsgs);
static void pexPulse(evutil_socket_t fd, short what, void* vmsgs);
static void protocolSendCancel(tr_peerMsgsImpl* msgs, struct peer_request const& req);
static void protocolSendChoke(tr_peerMsgsImpl* msgs, bool choke);
static void protocolSendHave(tr_peerMsgsImpl* msgs, tr_piece_index_t index);
static void protocolSendPort(tr_peerMsgsImpl* msgs, uint16_t port);
static void sendInterest(tr_peerMsgsImpl* msgs, bool b);
static void sendLtepHandshake(tr_peerMsgsImpl* msgs);
static void tellPeerWhatWeHave(tr_peerMsgsImpl* msgs);
static void updateDesiredRequestCount(tr_peerMsgsImpl* msgs);
//zzz

struct EventDeleter
{
    void operator()(struct event* ev) const
    {
        event_free(ev);
    }
};

using UniqueTimer = std::unique_ptr<struct event, EventDeleter>;

/**
 * Low-level communication state information about a connected peer.
 *
 * This structure remembers the low-level protocol states that we're
 * in with this peer, such as active requests, pex messages, and so on.
 * Its fields are all private to peer-msgs.c.
 *
 * Data not directly involved with sending & receiving messages is
 * stored in tr_peer, where it can be accessed by both peermsgs and
 * the peer manager.
 *
 * @see struct peer_atom
 * @see tr_peer
 */
class tr_peerMsgsImpl : public tr_peerMsgs
{
public:
    tr_peerMsgsImpl(tr_torrent* torrent_in, peer_atom* atom_in, tr_peerIo* io_in, tr_peer_callback callback, void* callbackData)
        : tr_peerMsgs{ torrent_in, atom_in }
        , outMessagesBatchPeriod{ LOW_PRIORITY_INTERVAL_SECS }
        , torrent{ torrent_in }
        , outMessages{ evbuffer_new() }
        , io{ io_in }
        , callback_{ callback }
        , callbackData_{ callbackData }
    {
        if (tr_torrentAllowsPex(torrent))
        {
            pex_timer.reset(evtimer_new(torrent->session->event_base, pexPulse, this));
            tr_timerAdd(pex_timer.get(), PEX_INTERVAL_SECS, 0);
        }

        if (tr_peerIoSupportsUTP(io))
        {
            tr_address const* addr = tr_peerIoGetAddress(io, nullptr);
            tr_peerMgrSetUtpSupported(torrent, addr);
            tr_peerMgrSetUtpFailed(torrent, addr, false);
        }

        if (tr_peerIoSupportsLTEP(io))
        {
            sendLtepHandshake(this);
        }

        tellPeerWhatWeHave(this);

        if (tr_dhtEnabled(torrent->session) && tr_peerIoSupportsDHT(io))
        {
            /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
            struct tr_address const* addr = tr_peerIoGetAddress(io, nullptr);

            if (addr->type == TR_AF_INET || tr_globalIPv6() != nullptr)
            {
                protocolSendPort(this, tr_dhtPort(torrent->session));
            }
        }

        tr_peerIoSetIOFuncs(io, canRead, didWrite, gotError, this);
        updateDesiredRequestCount(this);
    }

    ~tr_peerMsgsImpl() override
    {
        set_active(TR_UP, false);
        set_active(TR_DOWN, false);

        if (this->incoming.block != nullptr)
        {
            evbuffer_free(this->incoming.block);
        }

        if (this->io != nullptr)
        {
            tr_peerIoClear(this->io);
            tr_peerIoUnref(this->io); /* balanced by the ref in handshakeDoneCB() */
        }

        evbuffer_free(this->outMessages);
        tr_free(this->pex6);
        tr_free(this->pex);
    }

    bool is_transferring_pieces(uint64_t now, tr_direction direction, unsigned int* setme_Bps) const override
    {
        auto const Bps = tr_peerIoGetPieceSpeed_Bps(io, now, direction);

        if (setme_Bps != nullptr)
        {
            *setme_Bps = Bps;
        }

        return Bps > 0;
    }

    bool is_peer_choked() const override
    {
        return peer_is_choked_;
    }

    bool is_peer_interested() const override
    {
        return peer_is_interested_;
    }

    bool is_client_choked() const override
    {
        return client_is_choked_;
    }

    bool is_client_interested() const override
    {
        return client_is_interested_;
    }

    bool is_utp_connection() const override
    {
        return io->socket.type == TR_PEER_SOCKET_TYPE_UTP;
    }

    bool is_encrypted() const override
    {
        return tr_peerIoIsEncrypted(io);
    }

    bool is_incoming_connection() const override
    {
        return tr_peerIoIsIncoming(io);
    }

    bool is_active(tr_direction direction) const override
    {
        TR_ASSERT(tr_isDirection(direction));
        auto const active = is_active_[direction];
        TR_ASSERT(active == calculate_active(direction));
        return active;
    }

    void update_active(tr_direction direction) override
    {
        TR_ASSERT(tr_isDirection(direction));

        set_active(direction, calculate_active(direction));
    }

    time_t get_connection_age() const override
    {
        return tr_peerIoGetAge(io);
    }

    bool is_reading_block(tr_block_index_t block) const override
    {
        return state == AWAITING_BT_PIECE && block == _tr_block(torrent, incoming.blockReq.index, incoming.blockReq.offset);
    }

    void cancel_block_request(tr_block_index_t block) override
    {
        protocolSendCancel(this, blockToReq(torrent, block));
    }

    void set_choke(bool peer_is_choked) override
    {
        time_t const now = tr_time();
        time_t const fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;

        if (chokeChangedAt > fibrillationTime)
        {
            // TODO dbgmsg(msgs, "Not changing choke to %d to avoid fibrillation", peer_is_choked);
        }
        else if (peer_is_choked_ != peer_is_choked)
        {
            peer_is_choked_ = peer_is_choked;

            if (peer_is_choked_)
            {
                cancelAllRequestsToClient(this);
            }

            protocolSendChoke(this, peer_is_choked_);
            chokeChangedAt = now;
            update_active(TR_CLIENT_TO_PEER);
        }
    }

    void pulse() override
    {
        peerPulse(this);
    }

    void on_piece_completed(tr_piece_index_t piece) override
    {
        protocolSendHave(this, piece);

        // since we have more pieces now, we might not be interested in this peer
        update_interest();
    }

    void set_interested(bool interested) override
    {
        if (client_is_interested_ != interested)
        {
            client_is_interested_ = interested;
            sendInterest(this, interested);
            update_active(TR_PEER_TO_CLIENT);
        }
    }

    void update_interest()
    {
        // TODO -- might need to poke the mgr on startup
    }

    // publishing events

    void publishError(int err)
    {
        auto e = tr_peer_event{};
        e.eventType = TR_PEER_ERROR;
        e.err = err;
        publish(e);
    }

    void publishGotBlock(struct peer_request const* req)
    {
        auto e = tr_peer_event{};
        e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
        e.pieceIndex = req->index;
        e.offset = req->offset;
        e.length = req->length;
        publish(e);
    }

    void publishGotRej(struct peer_request const* req)
    {
        auto e = tr_peer_event{};
        e.eventType = TR_PEER_CLIENT_GOT_REJ;
        e.pieceIndex = req->index;
        e.offset = req->offset;
        e.length = req->length;
        publish(e);
    }

    void publishGotChoke()
    {
        auto e = tr_peer_event{};
        e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
        publish(e);
    }

    void publishClientGotHaveAll()
    {
        auto e = tr_peer_event{};
        e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL;
        publish(e);
    }

    void publishClientGotHaveNone()
    {
        auto e = tr_peer_event{};
        e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE;
        publish(e);
    }

    void publishClientGotPieceData(uint32_t length)
    {
        auto e = tr_peer_event{};
        e.length = length;
        e.eventType = TR_PEER_CLIENT_GOT_PIECE_DATA;
        publish(e);
    }

    void publishPeerGotPieceData(uint32_t length)
    {
        auto e = tr_peer_event{};
        e.length = length;
        e.eventType = TR_PEER_PEER_GOT_PIECE_DATA;
        publish(e);
    }

    void publishClientGotSuggest(tr_piece_index_t pieceIndex)
    {
        auto e = tr_peer_event{};
        e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
        e.pieceIndex = pieceIndex;
        publish(e);
    }

    void publishClientGotPort(tr_port port)
    {
        auto e = tr_peer_event{};
        e.eventType = TR_PEER_CLIENT_GOT_PORT;
        e.port = port;
        publish(e);
    }

    void publishClientGotAllowedFast(tr_piece_index_t pieceIndex)
    {
        auto e = tr_peer_event{};
        e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
        e.pieceIndex = pieceIndex;
        publish(e);
    }

    void publishClientGotBitfield(Bitfield* bitfield)
    {
        auto e = tr_peer_event{};
        e.eventType = TR_PEER_CLIENT_GOT_BITFIELD;
        e.bitfield = bitfield;
        publish(e);
    }

    void publishClientGotHave(tr_piece_index_t index)
    {
        auto e = tr_peer_event{};
        e.eventType = TR_PEER_CLIENT_GOT_HAVE;
        e.pieceIndex = index;
        publish(e);
    }

private:
    bool calculate_active(tr_direction direction) const
    {
        if (direction == TR_CLIENT_TO_PEER)
        {
            return is_peer_interested() && !is_peer_choked();
        }

        // TR_PEER_TO_CLIENT

        if (!tr_torrentHasMetadata(torrent))
        {
            return true;
        }

        auto const active = is_client_interested() && !is_client_choked();
        TR_ASSERT(!active || !tr_torrentIsSeed(torrent));
        return active;
    }

    void set_active(tr_direction direction, bool active)
    {
        // TODO dbgmsg(msgs, "direction [%d] is_active [%d]", (int)direction, (int)is_active);
        auto& val = is_active_[direction];
        if (val != active)
        {
            val = active;

            tr_swarmIncrementActivePeers(torrent->swarm, direction, active);
        }
    }

    void publish(tr_peer_event const& e)
    {
        if (callback_ != nullptr)
        {
            (*callback_)(this, &e, callbackData_);
        }
    }

public:
    /* Whether or not we've choked this peer. */
    bool peer_is_choked_ = true;

    /* whether or not the peer has indicated it will download from us. */
    bool peer_is_interested_ = false;

    /* whether or not the peer is choking us. */
    bool client_is_choked_ = true;

    /* whether or not we've indicated to the peer that we would download from them if unchoked. */
    bool client_is_interested_ = false;

    bool peerSupportsPex = false;
    bool peerSupportsMetadataXfer = false;
    bool clientSentLtepHandshake = false;
    bool peerSentLtepHandshake = false;

    int desiredRequestCount = 0;

    int prefetchCount = 0;

    /* how long the outMessages batch should be allowed to grow before
     * it's flushed -- some messages (like requests >:) should be sent
     * very quickly; others aren't as urgent. */
    int8_t outMessagesBatchPeriod;

    uint8_t state = AWAITING_BT_LENGTH;
    uint8_t ut_pex_id = 0;
    uint8_t ut_metadata_id = 0;
    uint16_t pexCount = 0;
    uint16_t pexCount6 = 0;

    tr_port dht_port = 0;

    encryption_preference_t encryption_preference = ENCRYPTION_PREFERENCE_UNKNOWN;

    size_t metadata_size_hint = 0;
#if 0
    size_t fastsetSize;
    tr_piece_index_t fastset[MAX_FAST_SET_SIZE];
#endif

    tr_torrent* const torrent;

    evbuffer* const outMessages; /* all the non-piece messages */

    struct peer_request peerAskedFor[REQQ] = {};

    int peerAskedForMetadata[METADATA_REQQ] = {};
    int peerAskedForMetadataCount = 0;

    tr_pex* pex = nullptr;
    tr_pex* pex6 = nullptr;

    time_t clientSentAnythingAt = 0;

    time_t chokeChangedAt = 0;

    /* when we started batching the outMessages */
    time_t outMessagesBatchedAt = 0;

    struct tr_incoming incoming = {};

    /* if the peer supports the Extension Protocol in BEP 10 and
       supplied a reqq argument, it's stored here. Otherwise, the
       value is zero and should be ignored. */
    int64_t reqq = 0;

    UniqueTimer pex_timer;

    tr_peerIo* io = nullptr;

private:
    bool is_active_[2] = { false, false };

    tr_peer_callback const callback_;
    void* const callbackData_;
};

tr_peerMsgs* tr_peerMsgsNew(tr_torrent* torrent, peer_atom* atom, tr_peerIo* io, tr_peer_callback callback, void* callbackData)
{
    return new tr_peerMsgsImpl(torrent, atom, io, callback, callbackData);
}

/**
***
**/

static void myDebug(char const* file, int line, tr_peerMsgsImpl const* msgs, char const* fmt, ...) TR_GNUC_PRINTF(4, 5);

static void myDebug(char const* file, int line, tr_peerMsgsImpl const* msgs, char const* fmt, ...)
{
    tr_sys_file_t const fp = tr_logGetFile();

    if (fp != TR_BAD_SYS_FILE)
    {
        va_list args;
        char timestr[64];
        char addrstr[TR_ADDRSTRLEN];
        struct evbuffer* buf = evbuffer_new();
        char* base = tr_sys_path_basename(file, nullptr);
        char* message;

        evbuffer_add_printf(
            buf,
            "[%s] %s - %s [%s]: ",
            tr_logGetTimeStr(timestr, sizeof(timestr)),
            tr_torrentName(msgs->torrent),
            tr_peerIoGetAddrStr(msgs->io, addrstr, sizeof(addrstr)),
            tr_quark_get_string(msgs->client, nullptr));
        va_start(args, fmt);
        evbuffer_add_vprintf(buf, fmt, args);
        va_end(args);
        evbuffer_add_printf(buf, " (%s:%d)", base, line);

        message = evbuffer_free_to_str(buf, nullptr);
        tr_sys_file_write_line(fp, message, nullptr);

        tr_free(base);
        tr_free(message);
    }
}

#define dbgmsg(msgs, ...) \
    do \
    { \
        if (tr_logGetDeepEnabled()) \
        { \
            myDebug(__FILE__, __LINE__, msgs, __VA_ARGS__); \
        } \
    } while (0)

/**
***
**/

static void pokeBatchPeriod(tr_peerMsgsImpl* msgs, int interval)
{
    if (msgs->outMessagesBatchPeriod > interval)
    {
        msgs->outMessagesBatchPeriod = interval;
        dbgmsg(msgs, "lowering batch interval to %d seconds", interval);
    }
}

static void dbgOutMessageLen(tr_peerMsgsImpl* msgs)
{
    dbgmsg(msgs, "outMessage size is now %zu", evbuffer_get_length(msgs->outMessages));
}

static void protocolSendReject(tr_peerMsgsImpl* msgs, struct peer_request const* req)
{
    TR_ASSERT(tr_peerIoSupportsFEXT(msgs->io));

    struct evbuffer* out = msgs->outMessages;

    evbuffer_add_uint32(out, sizeof(uint8_t) + 3 * sizeof(uint32_t));
    evbuffer_add_uint8(out, BT_FEXT_REJECT);
    evbuffer_add_uint32(out, req->index);
    evbuffer_add_uint32(out, req->offset);
    evbuffer_add_uint32(out, req->length);

    dbgmsg(msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length);
    dbgOutMessageLen(msgs);
}

static void protocolSendRequest(tr_peerMsgsImpl* msgs, struct peer_request const& req)
{
    struct evbuffer* out = msgs->outMessages;

    evbuffer_add_uint32(out, sizeof(uint8_t) + 3 * sizeof(uint32_t));
    evbuffer_add_uint8(out, BT_REQUEST);
    evbuffer_add_uint32(out, req.index);
    evbuffer_add_uint32(out, req.offset);
    evbuffer_add_uint32(out, req.length);

    dbgmsg(msgs, "requesting %u:%u->%u...", req.index, req.offset, req.length);
    dbgOutMessageLen(msgs);
    pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
}

static void protocolSendCancel(tr_peerMsgsImpl* msgs, peer_request const& req)
{
    struct evbuffer* out = msgs->outMessages;

    evbuffer_add_uint32(out, sizeof(uint8_t) + 3 * sizeof(uint32_t));
    evbuffer_add_uint8(out, BT_CANCEL);
    evbuffer_add_uint32(out, req.index);
    evbuffer_add_uint32(out, req.offset);
    evbuffer_add_uint32(out, req.length);

    dbgmsg(msgs, "cancelling %u:%u->%u...", req.index, req.offset, req.length);
    dbgOutMessageLen(msgs);
    pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
}

static void protocolSendPort(tr_peerMsgsImpl* msgs, uint16_t port)
{
    struct evbuffer* out = msgs->outMessages;

    dbgmsg(msgs, "sending Port %u", port);
    evbuffer_add_uint32(out, 3);
    evbuffer_add_uint8(out, BT_PORT);
    evbuffer_add_uint16(out, port);
}

static void protocolSendHave(tr_peerMsgsImpl* msgs, tr_piece_index_t index)
{
    struct evbuffer* out = msgs->outMessages;

    evbuffer_add_uint32(out, sizeof(uint8_t) + sizeof(uint32_t));
    evbuffer_add_uint8(out, BT_HAVE);
    evbuffer_add_uint32(out, index);

    dbgmsg(msgs, "sending Have %u", index);
    dbgOutMessageLen(msgs);
    pokeBatchPeriod(msgs, LOW_PRIORITY_INTERVAL_SECS);
}

#if 0

static void protocolSendAllowedFast(tr_peerMsgs* msgs, uint32_t pieceIndex)
{
    TR_ASSERT(tr_peerIoSupportsFEXT(msgs->io));

    tr_peerIo* io = msgs->io;
    struct evbuffer* out = msgs->outMessages;

    evbuffer_add_uint32(io, out, sizeof(uint8_t) + sizeof(uint32_t));
    evbuffer_add_uint8(io, out, BT_FEXT_ALLOWED_FAST);
    evbuffer_add_uint32(io, out, pieceIndex);

    dbgmsg(msgs, "sending Allowed Fast %u...", pieceIndex);
    dbgOutMessageLen(msgs);
}

#endif

static void protocolSendChoke(tr_peerMsgsImpl* msgs, bool choke)
{
    struct evbuffer* out = msgs->outMessages;

    evbuffer_add_uint32(out, sizeof(uint8_t));
    evbuffer_add_uint8(out, choke ? BT_CHOKE : BT_UNCHOKE);

    dbgmsg(msgs, "sending %s...", choke ? "Choke" : "Unchoke");
    dbgOutMessageLen(msgs);
    pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
}

static void protocolSendHaveAll(tr_peerMsgsImpl* msgs)
{
    TR_ASSERT(tr_peerIoSupportsFEXT(msgs->io));

    struct evbuffer* out = msgs->outMessages;

    evbuffer_add_uint32(out, sizeof(uint8_t));
    evbuffer_add_uint8(out, BT_FEXT_HAVE_ALL);

    dbgmsg(msgs, "sending HAVE_ALL...");
    dbgOutMessageLen(msgs);
    pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
}

static void protocolSendHaveNone(tr_peerMsgsImpl* msgs)
{
    TR_ASSERT(tr_peerIoSupportsFEXT(msgs->io));

    struct evbuffer* out = msgs->outMessages;

    evbuffer_add_uint32(out, sizeof(uint8_t));
    evbuffer_add_uint8(out, BT_FEXT_HAVE_NONE);

    dbgmsg(msgs, "sending HAVE_NONE...");
    dbgOutMessageLen(msgs);
    pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
}

/**
***  ALLOWED FAST SET
***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
**/

#if 0

size_t tr_generateAllowedSet(tr_piece_index_t* setmePieces, size_t desiredSetSize, size_t pieceCount, uint8_t const* infohash,
    tr_address const* addr)
{
    TR_ASSERT(setmePieces != nullptr);
    TR_ASSERT(desiredSetSize <= pieceCount);
    TR_ASSERT(desiredSetSize != 0);
    TR_ASSERT(pieceCount != 0);
    TR_ASSERT(infohash != nullptr);
    TR_ASSERT(addr != nullptr);

    size_t setSize = 0;

    if (addr->type == TR_AF_INET)
    {
        uint8_t w[SHA_DIGEST_LENGTH + 4];
        uint8_t* walk = w;
        uint8_t x[SHA_DIGEST_LENGTH];

        uint32_t ui32 = ntohl(htonl(addr->addr.addr4.s_addr) & 0xffffff00); /* (1) */
        memcpy(w, &ui32, sizeof(uint32_t));
        walk += sizeof(uint32_t);
        memcpy(walk, infohash, SHA_DIGEST_LENGTH); /* (2) */
        walk += SHA_DIGEST_LENGTH;
        tr_sha1(x, w, walk - w, nullptr); /* (3) */
        TR_ASSERT(sizeof(w) == walk - w);

        while (setSize < desiredSetSize)
        {
            for (int i = 0; i < 5 && setSize < desiredSetSize; ++i) /* (4) */
            {
                uint32_t j = i * 4; /* (5) */
                uint32_t y = ntohl(*(uint32_t*)(x + j)); /* (6) */
                uint32_t index = y % pieceCount; /* (7) */
                bool found = false;

                for (size_t k = 0; !found && k < setSize; ++k) /* (8) */
                {
                    found = setmePieces[k] == index;
                }

                if (!found)
                {
                    setmePieces[setSize++] = index; /* (9) */
                }
            }

            tr_sha1(x, x, sizeof(x), nullptr); /* (3) */
        }
    }

    return setSize;
}

static void updateFastSet([[maybe_unused]] tr_peerMsgs* msgs)
{
    bool const fext = tr_peerIoSupportsFEXT(msgs->io);
    bool const peerIsNeedy = msgs->peer->progress < 0.10;

    if (fext && peerIsNeedy && !msgs->haveFastSet)
    {
        struct tr_address const* addr = tr_peerIoGetAddress(msgs->io, nullptr);
        tr_info const* inf = &msgs->torrent->info;
        size_t const numwant = std::min(MAX_FAST_SET_SIZE, inf->pieceCount);

        /* build the fast set */
        msgs->fastsetSize = tr_generateAllowedSet(msgs->fastset, numwant, inf->pieceCount, inf->hash, addr);
        msgs->haveFastSet = true;

        /* send it to the peer */
        for (size_t i = 0; i < msgs->fastsetSize; ++i)
        {
            protocolSendAllowedFast(msgs, msgs->fastset[i]);
        }
    }
}

#endif
/**
***  INTEREST
**/

static void sendInterest(tr_peerMsgsImpl* msgs, bool b)
{
    TR_ASSERT(msgs != nullptr);

    struct evbuffer* out = msgs->outMessages;

    dbgmsg(msgs, "Sending %s", b ? "Interested" : "Not Interested");
    evbuffer_add_uint32(out, sizeof(uint8_t));
    evbuffer_add_uint8(out, b ? BT_INTERESTED : BT_NOT_INTERESTED);

    pokeBatchPeriod(msgs, HIGH_PRIORITY_INTERVAL_SECS);
    dbgOutMessageLen(msgs);
}

static bool popNextMetadataRequest(tr_peerMsgsImpl* msgs, int* piece)
{
    if (msgs->peerAskedForMetadataCount == 0)
    {
        return false;
    }

    *piece = msgs->peerAskedForMetadata[0];

    tr_removeElementFromArray(msgs->peerAskedForMetadata, 0, sizeof(int), msgs->peerAskedForMetadataCount);
    --msgs->peerAskedForMetadataCount;

    return true;
}

static bool popNextRequest(tr_peerMsgsImpl* msgs, struct peer_request* setme)
{
    if (msgs->pendingReqsToClient == 0)
    {
        return false;
    }

    *setme = msgs->peerAskedFor[0];

    tr_removeElementFromArray(msgs->peerAskedFor, 0, sizeof(struct peer_request), msgs->pendingReqsToClient);
    --msgs->pendingReqsToClient;

    return true;
}

static void cancelAllRequestsToClient(tr_peerMsgsImpl* msgs)
{
    struct peer_request req;
    bool const mustSendCancel = tr_peerIoSupportsFEXT(msgs->io);

    while (popNextRequest(msgs, &req))
    {
        if (mustSendCancel)
        {
            protocolSendReject(msgs, &req);
        }
    }
}

/**
***
**/

static bool reqIsValid(tr_peerMsgsImpl const* peer, uint32_t index, uint32_t offset, uint32_t length)
{
    return tr_torrentReqIsValid(peer->torrent, index, offset, length);
}

static bool requestIsValid(tr_peerMsgsImpl const* msgs, struct peer_request const* req)
{
    return reqIsValid(msgs, req->index, req->offset, req->length);
}

/**
***
**/

static void sendLtepHandshake(tr_peerMsgsImpl* msgs)
{
    tr_variant val;
    bool allow_pex;
    struct evbuffer* payload;
    struct evbuffer* out = msgs->outMessages;
    unsigned char const* ipv6 = tr_globalIPv6();
    static tr_quark version_quark = 0;

    if (msgs->clientSentLtepHandshake)
    {
        return;
    }

    if (version_quark == 0)
    {
        version_quark = tr_quark_new(TR_NAME " " USERAGENT_PREFIX, TR_BAD_SIZE);
    }

    dbgmsg(msgs, "sending an ltep handshake");
    msgs->clientSentLtepHandshake = true;

    /* decide if we want to advertise metadata xfer support (BEP 9) */
    bool const allow_metadata_xfer = !tr_torrentIsPrivate(msgs->torrent);

    /* decide if we want to advertise pex support */
    if (!tr_torrentAllowsPex(msgs->torrent))
    {
        allow_pex = false;
    }
    else if (msgs->peerSentLtepHandshake)
    {
        allow_pex = msgs->peerSupportsPex;
    }
    else
    {
        allow_pex = true;
    }

    tr_variantInitDict(&val, 8);
    tr_variantDictAddBool(&val, TR_KEY_e, msgs->session->encryptionMode != TR_CLEAR_PREFERRED);

    if (ipv6 != nullptr)
    {
        tr_variantDictAddRaw(&val, TR_KEY_ipv6, ipv6, 16);
    }

    if (allow_metadata_xfer && tr_torrentHasMetadata(msgs->torrent) && msgs->torrent->infoDictLength > 0)
    {
        tr_variantDictAddInt(&val, TR_KEY_metadata_size, msgs->torrent->infoDictLength);
    }

    tr_variantDictAddInt(&val, TR_KEY_p, tr_sessionGetPublicPeerPort(msgs->session));
    tr_variantDictAddInt(&val, TR_KEY_reqq, REQQ);
    tr_variantDictAddBool(&val, TR_KEY_upload_only, tr_torrentIsSeed(msgs->torrent));
    tr_variantDictAddQuark(&val, TR_KEY_v, version_quark);

    if (allow_metadata_xfer || allow_pex)
    {
        tr_variant* m = tr_variantDictAddDict(&val, TR_KEY_m, 2);

        if (allow_metadata_xfer)
        {
            tr_variantDictAddInt(m, TR_KEY_ut_metadata, UT_METADATA_ID);
        }

        if (allow_pex)
        {
            tr_variantDictAddInt(m, TR_KEY_ut_pex, UT_PEX_ID);
        }
    }

    payload = tr_variantToBuf(&val, TR_VARIANT_FMT_BENC);

    evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload));
    evbuffer_add_uint8(out, BT_LTEP);
    evbuffer_add_uint8(out, LTEP_HANDSHAKE);
    evbuffer_add_buffer(out, payload);
    pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
    dbgOutMessageLen(msgs);

    /* cleanup */
    evbuffer_free(payload);
    tr_variantFree(&val);
}

static void parseLtepHandshake(tr_peerMsgsImpl* msgs, uint32_t len, struct evbuffer* inbuf)
{
    int64_t i;
    tr_variant val;
    tr_variant* sub;
    uint8_t* tmp = tr_new(uint8_t, len);
    uint8_t const* addr;
    size_t addr_len;
    tr_pex pex;

    memset(&pex, 0, sizeof(tr_pex));

    tr_peerIoReadBytes(msgs->io, inbuf, tmp, len);
    msgs->peerSentLtepHandshake = true;

    if (tr_variantFromBenc(&val, tmp, len) != 0 || !tr_variantIsDict(&val))
    {
        dbgmsg(msgs, "GET  extended-handshake, couldn't get dictionary");
        tr_free(tmp);
        return;
    }

    /* arbitrary limit, should be more than enough */
    if (len <= 4096)
    {
        dbgmsg(msgs, "here is the handshake: [%*.*s]", TR_ARG_TUPLE((int)len, (int)len, tmp));
    }
    else
    {
        dbgmsg(msgs, "handshake length is too big (%" PRIu32 "), printing skipped", len);
    }

    /* does the peer prefer encrypted connections? */
    if (tr_variantDictFindInt(&val, TR_KEY_e, &i))
    {
        msgs->encryption_preference = i != 0 ? ENCRYPTION_PREFERENCE_YES : ENCRYPTION_PREFERENCE_NO;

        if (i != 0)
        {
            pex.flags |= ADDED_F_ENCRYPTION_FLAG;
        }
    }

    /* check supported messages for utorrent pex */
    msgs->peerSupportsPex = false;
    msgs->peerSupportsMetadataXfer = false;

    if (tr_variantDictFindDict(&val, TR_KEY_m, &sub))
    {
        if (tr_variantDictFindInt(sub, TR_KEY_ut_pex, &i))
        {
            msgs->peerSupportsPex = i != 0;
            msgs->ut_pex_id = (uint8_t)i;
            dbgmsg(msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id);
        }

        if (tr_variantDictFindInt(sub, TR_KEY_ut_metadata, &i))
        {
            msgs->peerSupportsMetadataXfer = i != 0;
            msgs->ut_metadata_id = (uint8_t)i;
            dbgmsg(msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id);
        }

        if (tr_variantDictFindInt(sub, TR_KEY_ut_holepunch, &i))
        {
            /* Mysterious µTorrent extension that we don't grok.  However,
               it implies support for µTP, so use it to indicate that. */
            tr_peerMgrSetUtpFailed(msgs->torrent, tr_peerIoGetAddress(msgs->io, nullptr), false);
        }
    }

    /* look for metainfo size (BEP 9) */
    if (tr_variantDictFindInt(&val, TR_KEY_metadata_size, &i) && tr_torrentSetMetadataSizeHint(msgs->torrent, i))
    {
        msgs->metadata_size_hint = (size_t)i;
    }

    /* look for upload_only (BEP 21) */
    if (tr_variantDictFindInt(&val, TR_KEY_upload_only, &i))
    {
        pex.flags |= ADDED_F_SEED_FLAG;
    }

    /* get peer's listening port */
    if (tr_variantDictFindInt(&val, TR_KEY_p, &i))
    {
        pex.port = htons((uint16_t)i);
        msgs->publishClientGotPort(pex.port);
        dbgmsg(msgs, "peer's port is now %d", (int)i);
    }

    if (tr_peerIoIsIncoming(msgs->io) && tr_variantDictFindRaw(&val, TR_KEY_ipv4, &addr, &addr_len) && addr_len == 4)
    {
        pex.addr.type = TR_AF_INET;
        memcpy(&pex.addr.addr.addr4, addr, 4);
        tr_peerMgrAddPex(msgs->torrent, TR_PEER_FROM_LTEP, &pex, 1);
    }

    if (tr_peerIoIsIncoming(msgs->io) && tr_variantDictFindRaw(&val, TR_KEY_ipv6, &addr, &addr_len) && addr_len == 16)
    {
        pex.addr.type = TR_AF_INET6;
        memcpy(&pex.addr.addr.addr6, addr, 16);
        tr_peerMgrAddPex(msgs->torrent, TR_PEER_FROM_LTEP, &pex, 1);
    }

    /* get peer's maximum request queue size */
    if (tr_variantDictFindInt(&val, TR_KEY_reqq, &i))
    {
        msgs->reqq = i;
    }

    tr_variantFree(&val);
    tr_free(tmp);
}

static void parseUtMetadata(tr_peerMsgsImpl* msgs, uint32_t msglen, struct evbuffer* inbuf)
{
    int64_t msg_type = -1;
    int64_t piece = -1;
    int64_t total_size = 0;
    uint8_t* const tmp = tr_new(uint8_t, msglen);

    tr_peerIoReadBytes(msgs->io, inbuf, tmp, msglen);
    char const* const msg_end = (char const*)tmp + msglen;

    tr_variant dict;
    char const* benc_end;
    if (tr_variantFromBencFull(&dict, tmp, msglen, nullptr, &benc_end) == 0)
    {
        (void)tr_variantDictFindInt(&dict, TR_KEY_msg_type, &msg_type);
        (void)tr_variantDictFindInt(&dict, TR_KEY_piece, &piece);
        (void)tr_variantDictFindInt(&dict, TR_KEY_total_size, &total_size);
        tr_variantFree(&dict);
    }

    dbgmsg(msgs, "got ut_metadata msg: type %d, piece %d, total_size %d", (int)msg_type, (int)piece, (int)total_size);

    if (msg_type == METADATA_MSG_TYPE_REJECT)
    {
        /* NOOP */
    }

    if (msg_type == METADATA_MSG_TYPE_DATA && !tr_torrentHasMetadata(msgs->torrent) &&
        msg_end - benc_end <= METADATA_PIECE_SIZE && piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size)
    {
        int const pieceLen = msg_end - benc_end;
        tr_torrentSetMetadataPiece(msgs->torrent, piece, benc_end, pieceLen);
    }

    if (msg_type == METADATA_MSG_TYPE_REQUEST)
    {
        if (piece >= 0 && tr_torrentHasMetadata(msgs->torrent) && !tr_torrentIsPrivate(msgs->torrent) &&
            msgs->peerAskedForMetadataCount < METADATA_REQQ)
        {
            msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
        }
        else
        {
            tr_variant v;
            struct evbuffer* payload;
            struct evbuffer* out = msgs->outMessages;

            /* build the rejection message */
            tr_variantInitDict(&v, 2);
            tr_variantDictAddInt(&v, TR_KEY_msg_type, METADATA_MSG_TYPE_REJECT);
            tr_variantDictAddInt(&v, TR_KEY_piece, piece);
            payload = tr_variantToBuf(&v, TR_VARIANT_FMT_BENC);

            /* write it out as a LTEP message to our outMessages buffer */
            evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload));
            evbuffer_add_uint8(out, BT_LTEP);
            evbuffer_add_uint8(out, msgs->ut_metadata_id);
            evbuffer_add_buffer(out, payload);
            pokeBatchPeriod(msgs, HIGH_PRIORITY_INTERVAL_SECS);
            dbgOutMessageLen(msgs);

            /* cleanup */
            evbuffer_free(payload);
            tr_variantFree(&v);
        }
    }

    tr_free(tmp);
}

static void parseUtPex(tr_peerMsgsImpl* msgs, uint32_t msglen, struct evbuffer* inbuf)
{
    tr_torrent* tor = msgs->torrent;
    if (!tr_torrentAllowsPex(tor))
    {
        return;
    }

    uint8_t* tmp = tr_new(uint8_t, msglen);
    tr_peerIoReadBytes(msgs->io, inbuf, tmp, msglen);

    tr_variant val;
    bool const loaded = tr_variantFromBenc(&val, tmp, msglen) == 0;

    tr_free(tmp);

    if (!loaded)
    {
        return;
    }

    uint8_t const* added;
    size_t added_len;

    if (tr_variantDictFindRaw(&val, TR_KEY_added, &added, &added_len))
    {
        tr_pex* pex;
        size_t n;
        size_t added_f_len;
        uint8_t const* added_f;

        if (!tr_variantDictFindRaw(&val, TR_KEY_added_f, &added_f, &added_f_len))
        {
            added_f_len = 0;
            added_f = nullptr;
        }

        pex = tr_peerMgrCompactToPex(added, added_len, added_f, added_f_len, &n);
        n = std::min(n, size_t{ MAX_PEX_PEER_COUNT });
        tr_peerMgrAddPex(tor, TR_PEER_FROM_PEX, pex, n);

        tr_free(pex);
    }

    if (tr_variantDictFindRaw(&val, TR_KEY_added6, &added, &added_len))
    {
        tr_pex* pex;
        size_t n;
        size_t added_f_len;
        uint8_t const* added_f;

        if (!tr_variantDictFindRaw(&val, TR_KEY_added6_f, &added_f, &added_f_len))
        {
            added_f_len = 0;
            added_f = nullptr;
        }

        pex = tr_peerMgrCompact6ToPex(added, added_len, added_f, added_f_len, &n);
        n = std::min(n, size_t{ MAX_PEX_PEER_COUNT });
        tr_peerMgrAddPex(tor, TR_PEER_FROM_PEX, pex, n);

        tr_free(pex);
    }

    tr_variantFree(&val);
}

static void sendPex(tr_peerMsgsImpl* msgs);

static void parseLtep(tr_peerMsgsImpl* msgs, uint32_t msglen, struct evbuffer* inbuf)
{
    TR_ASSERT(msglen > 0);

    uint8_t ltep_msgid;

    tr_peerIoReadUint8(msgs->io, inbuf, &ltep_msgid);
    msglen--;

    if (ltep_msgid == LTEP_HANDSHAKE)
    {
        dbgmsg(msgs, "got ltep handshake");
        parseLtepHandshake(msgs, msglen, inbuf);

        if (tr_peerIoSupportsLTEP(msgs->io))
        {
            sendLtepHandshake(msgs);
            sendPex(msgs);
        }
    }
    else if (ltep_msgid == UT_PEX_ID)
    {
        dbgmsg(msgs, "got ut pex");
        msgs->peerSupportsPex = true;
        parseUtPex(msgs, msglen, inbuf);
    }
    else if (ltep_msgid == UT_METADATA_ID)
    {
        dbgmsg(msgs, "got ut metadata");
        msgs->peerSupportsMetadataXfer = true;
        parseUtMetadata(msgs, msglen, inbuf);
    }
    else
    {
        dbgmsg(msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid);
        evbuffer_drain(inbuf, msglen);
    }
}

static ReadState readBtLength(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, size_t inlen)
{
    uint32_t len;

    if (inlen < sizeof(len))
    {
        return READ_LATER;
    }

    tr_peerIoReadUint32(msgs->io, inbuf, &len);

    if (len == 0) /* peer sent us a keepalive message */
    {
        dbgmsg(msgs, "got KeepAlive");
    }
    else
    {
        msgs->incoming.length = len;
        msgs->state = AWAITING_BT_ID;
    }

    return READ_NOW;
}

static ReadState readBtMessage(tr_peerMsgsImpl*, struct evbuffer*, size_t);

static ReadState readBtId(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, size_t inlen)
{
    uint8_t id;

    if (inlen < sizeof(uint8_t))
    {
        return READ_LATER;
    }

    tr_peerIoReadUint8(msgs->io, inbuf, &id);
    msgs->incoming.id = id;
    dbgmsg(msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length);

    if (id == BT_PIECE)
    {
        msgs->state = AWAITING_BT_PIECE;
        return READ_NOW;
    }
    else if (msgs->incoming.length != 1)
    {
        msgs->state = AWAITING_BT_MESSAGE;
        return READ_NOW;
    }
    else
    {
        return readBtMessage(msgs, inbuf, inlen - 1);
    }
}

static void updatePeerProgress(tr_peerMsgsImpl* msgs)
{
    tr_peerUpdateProgress(msgs->torrent, msgs);

    msgs->update_interest();
}

static void prefetchPieces(tr_peerMsgsImpl* msgs)
{
    if (!msgs->session->isPrefetchEnabled)
    {
        return;
    }

    for (int i = msgs->prefetchCount; i < msgs->pendingReqsToClient && i < PREFETCH_SIZE; ++i)
    {
        struct peer_request const* req = msgs->peerAskedFor + i;

        if (requestIsValid(msgs, req))
        {
            tr_cachePrefetchBlock(msgs->session->cache, msgs->torrent, req->index, req->offset, req->length);
            ++msgs->prefetchCount;
        }
    }
}

static void peerMadeRequest(tr_peerMsgsImpl* msgs, struct peer_request const* req)
{
    bool const fext = tr_peerIoSupportsFEXT(msgs->io);
    bool const reqIsValid = requestIsValid(msgs, req);
    bool const clientHasPiece = reqIsValid && tr_torrentPieceIsComplete(msgs->torrent, req->index);
    bool const peerIsChoked = msgs->peer_is_choked_;

    bool allow = false;

    if (!reqIsValid)
    {
        dbgmsg(msgs, "rejecting an invalid request.");
    }
    else if (!clientHasPiece)
    {
        dbgmsg(msgs, "rejecting request for a piece we don't have.");
    }
    else if (peerIsChoked)
    {
        dbgmsg(msgs, "rejecting request from choked peer");
    }
    else if (msgs->pendingReqsToClient + 1 >= REQQ)
    {
        dbgmsg(msgs, "rejecting request ... reqq is full");
    }
    else
    {
        allow = true;
    }

    if (allow)
    {
        msgs->peerAskedFor[msgs->pendingReqsToClient++] = *req;
        prefetchPieces(msgs);
    }
    else if (fext)
    {
        protocolSendReject(msgs, req);
    }
}

static bool messageLengthIsCorrect(tr_peerMsgsImpl const* msg, uint8_t id, uint32_t len)
{
    switch (id)
    {
    case BT_CHOKE:
    case BT_UNCHOKE:
    case BT_INTERESTED:
    case BT_NOT_INTERESTED:
    case BT_FEXT_HAVE_ALL:
    case BT_FEXT_HAVE_NONE:
        return len == 1;

    case BT_HAVE:
    case BT_FEXT_SUGGEST:
    case BT_FEXT_ALLOWED_FAST:
        return len == 5;

    case BT_BITFIELD:
        if (tr_torrentHasMetadata(msg->torrent))
        {
            return len == (msg->torrent->info.pieceCount >> 3) + ((msg->torrent->info.pieceCount & 7) != 0 ? 1 : 0) + 1U;
        }

        /* we don't know the piece count yet,
           so we can only guess whether to send true or false */
        if (msg->metadata_size_hint > 0)
        {
            return len <= msg->metadata_size_hint;
        }

        return true;

    case BT_REQUEST:
    case BT_CANCEL:
    case BT_FEXT_REJECT:
        return len == 13;

    case BT_PIECE:
        return len > 9 && len <= 16393;

    case BT_PORT:
        return len == 3;

    case BT_LTEP:
        return len >= 2;

    default:
        return false;
    }
}

static int clientGotBlock(tr_peerMsgsImpl* msgs, struct evbuffer* block, struct peer_request const* req);

static ReadState readBtPiece(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, size_t inlen, size_t* setme_piece_bytes_read)
{
    TR_ASSERT(evbuffer_get_length(inbuf) >= inlen);

    dbgmsg(msgs, "In readBtPiece");

    struct peer_request* req = &msgs->incoming.blockReq;

    if (req->length == 0)
    {
        if (inlen < 8)
        {
            return READ_LATER;
        }

        tr_peerIoReadUint32(msgs->io, inbuf, &req->index);
        tr_peerIoReadUint32(msgs->io, inbuf, &req->offset);
        req->length = msgs->incoming.length - 9;
        dbgmsg(msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length);
        return READ_NOW;
    }
    else
    {
        int err;
        size_t n;
        size_t nLeft;
        struct evbuffer* block_buffer;

        if (msgs->incoming.block == nullptr)
        {
            msgs->incoming.block = evbuffer_new();
        }

        block_buffer = msgs->incoming.block;

        /* read in another chunk of data */
        nLeft = req->length - evbuffer_get_length(block_buffer);
        n = std::min(nLeft, inlen);

        tr_peerIoReadBytesToBuf(msgs->io, inbuf, block_buffer, n);

        msgs->publishClientGotPieceData(n);
        *setme_piece_bytes_read += n;
        dbgmsg(
            msgs,
            "got %zu bytes for block %u:%u->%u ... %d remain",
            n,
            req->index,
            req->offset,
            req->length,
            (int)(req->length - evbuffer_get_length(block_buffer)));

        if (evbuffer_get_length(block_buffer) < req->length)
        {
            return READ_LATER;
        }

        /* pass the block along... */
        err = clientGotBlock(msgs, block_buffer, req);
        evbuffer_drain(block_buffer, evbuffer_get_length(block_buffer));

        /* cleanup */
        req->length = 0;
        msgs->state = AWAITING_BT_LENGTH;
        return err != 0 ? READ_ERR : READ_NOW;
    }
}

static ReadState readBtMessage(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, size_t inlen)
{
    uint8_t const id = msgs->incoming.id;
#ifdef TR_ENABLE_ASSERTS
    size_t const startBufLen = evbuffer_get_length(inbuf);
#endif
    bool const fext = tr_peerIoSupportsFEXT(msgs->io);

    uint32_t ui32;
    uint32_t msglen = msgs->incoming.length;

    TR_ASSERT(msglen > 0);

    --msglen; /* id length */

    dbgmsg(msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen);

    if (inlen < msglen)
    {
        return READ_LATER;
    }

    if (!messageLengthIsCorrect(msgs, id, msglen + 1))
    {
        dbgmsg(msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen);
        msgs->publishError(EMSGSIZE);
        return READ_ERR;
    }

    switch (id)
    {
    case BT_CHOKE:
        dbgmsg(msgs, "got Choke");
        msgs->client_is_choked_ = true;

        if (!fext)
        {
            msgs->publishGotChoke();
        }

        msgs->update_active(TR_PEER_TO_CLIENT);
        break;

    case BT_UNCHOKE:
        dbgmsg(msgs, "got Unchoke");
        msgs->client_is_choked_ = false;
        msgs->update_active(TR_PEER_TO_CLIENT);
        updateDesiredRequestCount(msgs);
        break;

    case BT_INTERESTED:
        dbgmsg(msgs, "got Interested");
        msgs->peer_is_interested_ = true;
        msgs->update_active(TR_CLIENT_TO_PEER);
        break;

    case BT_NOT_INTERESTED:
        dbgmsg(msgs, "got Not Interested");
        msgs->peer_is_interested_ = false;
        msgs->update_active(TR_CLIENT_TO_PEER);
        break;

    case BT_HAVE:
        tr_peerIoReadUint32(msgs->io, inbuf, &ui32);
        dbgmsg(msgs, "got Have: %u", ui32);

        if (tr_torrentHasMetadata(msgs->torrent) && ui32 >= msgs->torrent->info.pieceCount)
        {
            msgs->publishError(ERANGE);
            return READ_ERR;
        }

        /* a peer can send the same HAVE message twice... */
        if (!msgs->have.readBit(ui32))
        {
            msgs->have.setBit(ui32);
            msgs->publishClientGotHave(ui32);
        }

        updatePeerProgress(msgs);
        break;

    case BT_BITFIELD:
        {
            uint8_t* tmp = tr_new(uint8_t, msglen);
            dbgmsg(msgs, "got a bitfield");
            tr_peerIoReadBytes(msgs->io, inbuf, tmp, msglen);
            msgs->have.setRaw(tmp, msglen, tr_torrentHasMetadata(msgs->torrent));
            msgs->publishClientGotBitfield(&msgs->have);
            updatePeerProgress(msgs);
            tr_free(tmp);
            break;
        }

    case BT_REQUEST:
        {
            struct peer_request r;
            tr_peerIoReadUint32(msgs->io, inbuf, &r.index);
            tr_peerIoReadUint32(msgs->io, inbuf, &r.offset);
            tr_peerIoReadUint32(msgs->io, inbuf, &r.length);
            dbgmsg(msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length);
            peerMadeRequest(msgs, &r);
            break;
        }

    case BT_CANCEL:
        {
            struct peer_request r;
            tr_peerIoReadUint32(msgs->io, inbuf, &r.index);
            tr_peerIoReadUint32(msgs->io, inbuf, &r.offset);
            tr_peerIoReadUint32(msgs->io, inbuf, &r.length);
            msgs->cancelsSentToClient.add(tr_time(), 1);
            dbgmsg(msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length);

            for (int i = 0; i < msgs->pendingReqsToClient; ++i)
            {
                struct peer_request const* req = msgs->peerAskedFor + i;

                if (req->index == r.index && req->offset == r.offset && req->length == r.length)
                {
                    tr_removeElementFromArray(msgs->peerAskedFor, i, sizeof(struct peer_request), msgs->pendingReqsToClient);
                    --msgs->pendingReqsToClient;
                    break;
                }
            }

            break;
        }

    case BT_PIECE:
        TR_ASSERT(false); /* handled elsewhere! */
        break;

    case BT_PORT:
        dbgmsg(msgs, "Got a BT_PORT");
        tr_peerIoReadUint16(msgs->io, inbuf, &msgs->dht_port);

        if (msgs->dht_port > 0)
        {
            tr_dhtAddNode(msgs->session, tr_peerAddress(msgs), msgs->dht_port, false);
        }

        break;

    case BT_FEXT_SUGGEST:
        dbgmsg(msgs, "Got a BT_FEXT_SUGGEST");
        tr_peerIoReadUint32(msgs->io, inbuf, &ui32);

        if (fext)
        {
            msgs->publishClientGotSuggest(ui32);
        }
        else
        {
            msgs->publishError(EMSGSIZE);
            return READ_ERR;
        }

        break;

    case BT_FEXT_ALLOWED_FAST:
        dbgmsg(msgs, "Got a BT_FEXT_ALLOWED_FAST");
        tr_peerIoReadUint32(msgs->io, inbuf, &ui32);

        if (fext)
        {
            msgs->publishClientGotAllowedFast(ui32);
        }
        else
        {
            msgs->publishError(EMSGSIZE);
            return READ_ERR;
        }

        break;

    case BT_FEXT_HAVE_ALL:
        dbgmsg(msgs, "Got a BT_FEXT_HAVE_ALL");

        if (fext)
        {
            msgs->have.setHasAll();
            TR_ASSERT(msgs->have.hasAll());
            msgs->publishClientGotHaveAll();
            updatePeerProgress(msgs);
        }
        else
        {
            msgs->publishError(EMSGSIZE);
            return READ_ERR;
        }

        break;

    case BT_FEXT_HAVE_NONE:
        dbgmsg(msgs, "Got a BT_FEXT_HAVE_NONE");

        if (fext)
        {
            msgs->have.setHasNone();
            msgs->publishClientGotHaveNone();
            updatePeerProgress(msgs);
        }
        else
        {
            msgs->publishError(EMSGSIZE);
            return READ_ERR;
        }

        break;

    case BT_FEXT_REJECT:
        {
            struct peer_request r;
            dbgmsg(msgs, "Got a BT_FEXT_REJECT");
            tr_peerIoReadUint32(msgs->io, inbuf, &r.index);
            tr_peerIoReadUint32(msgs->io, inbuf, &r.offset);
            tr_peerIoReadUint32(msgs->io, inbuf, &r.length);

            if (fext)
            {
                msgs->publishGotRej(&r);
            }
            else
            {
                msgs->publishError(EMSGSIZE);
                return READ_ERR;
            }

            break;
        }

    case BT_LTEP:
        dbgmsg(msgs, "Got a BT_LTEP");
        parseLtep(msgs, msglen, inbuf);
        break;

    default:
        dbgmsg(msgs, "peer sent us an UNKNOWN: %d", (int)id);
        tr_peerIoDrain(msgs->io, inbuf, msglen);
        break;
    }

    TR_ASSERT(msglen + 1 == msgs->incoming.length);
    TR_ASSERT(evbuffer_get_length(inbuf) == startBufLen - msglen);

    msgs->state = AWAITING_BT_LENGTH;
    return READ_NOW;
}

/* returns 0 on success, or an errno on failure */
static int clientGotBlock(tr_peerMsgsImpl* msgs, struct evbuffer* data, struct peer_request const* req)
{
    TR_ASSERT(msgs != nullptr);
    TR_ASSERT(req != nullptr);

    int err;
    tr_torrent* tor = msgs->torrent;
    tr_block_index_t const block = _tr_block(tor, req->index, req->offset);

    if (!requestIsValid(msgs, req))
    {
        dbgmsg(msgs, "dropping invalid block %u:%u->%u", req->index, req->offset, req->length);
        return EBADMSG;
    }

    if (req->length != tr_torBlockCountBytes(msgs->torrent, block))
    {
        dbgmsg(msgs, "wrong block size -- expected %u, got %d", tr_torBlockCountBytes(msgs->torrent, block), req->length);
        return EMSGSIZE;
    }

    dbgmsg(msgs, "got block %u:%u->%u", req->index, req->offset, req->length);

    if (!tr_peerMgrDidPeerRequest(msgs->torrent, msgs, block))
    {
        dbgmsg(msgs, "we didn't ask for this message...");
        return 0;
    }

    if (tr_torrentPieceIsComplete(msgs->torrent, req->index))
    {
        dbgmsg(msgs, "we did ask for this message, but the piece is already complete...");
        return 0;
    }

    /**
    ***  Save the block
    **/

    if ((err = tr_cacheWriteBlock(msgs->session->cache, tor, req->index, req->offset, req->length, data)) != 0)
    {
        return err;
    }

    msgs->blame.setBit(req->index);
    msgs->publishGotBlock(req);
    return 0;
}

static void didWrite(tr_peerIo* io, size_t bytesWritten, bool wasPieceData, void* vmsgs)
{
    auto* msgs = static_cast<tr_peerMsgsImpl*>(vmsgs);

    if (wasPieceData)
    {
        msgs->publishPeerGotPieceData(bytesWritten);
    }

    if (tr_isPeerIo(io) && io->userData != nullptr)
    {
        peerPulse(msgs);
    }
}

static ReadState canRead(tr_peerIo* io, void* vmsgs, size_t* piece)
{
    ReadState ret;
    auto* msgs = static_cast<tr_peerMsgsImpl*>(vmsgs);
    struct evbuffer* in = tr_peerIoGetReadBuffer(io);
    size_t const inlen = evbuffer_get_length(in);

    dbgmsg(msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state);

    if (inlen == 0)
    {
        ret = READ_LATER;
    }
    else if (msgs->state == AWAITING_BT_PIECE)
    {
        ret = readBtPiece(msgs, in, inlen, piece);
    }
    else
    {
        switch (msgs->state)
        {
        case AWAITING_BT_LENGTH:
            ret = readBtLength(msgs, in, inlen);
            break;

        case AWAITING_BT_ID:
            ret = readBtId(msgs, in, inlen);
            break;

        case AWAITING_BT_MESSAGE:
            ret = readBtMessage(msgs, in, inlen);
            break;

        default:
#ifdef TR_ENABLE_ASSERTS
            TR_ASSERT_MSG(false, "unhandled peer messages state %d", (int)msgs->state);
#else
            ret = READ_ERR;
            break;
#endif
        }
    }

    dbgmsg(msgs, "canRead: ret is %d", (int)ret);

    return ret;
}

/**
***
**/

static void updateDesiredRequestCount(tr_peerMsgsImpl* msgs)
{
    tr_torrent const* const torrent = msgs->torrent;

    /* there are lots of reasons we might not want to request any blocks... */
    if (tr_torrentIsSeed(torrent) || !tr_torrentHasMetadata(torrent) || msgs->client_is_choked_ || !msgs->client_is_interested_)
    {
        msgs->desiredRequestCount = 0;
    }
    else
    {
        int estimatedBlocksInPeriod;
        unsigned int rate_Bps;
        unsigned int irate_Bps;
        int const floor = 4;
        int const seconds = REQUEST_BUF_SECS;
        uint64_t const now = tr_time_msec();

        /* Get the rate limit we should use.
         * FIXME: this needs to consider all the other peers as well... */
        rate_Bps = tr_peerGetPieceSpeed_Bps(msgs, now, TR_PEER_TO_CLIENT);

        if (tr_torrentUsesSpeedLimit(torrent, TR_PEER_TO_CLIENT))
        {
            rate_Bps = std::min(rate_Bps, tr_torrentGetSpeedLimit_Bps(torrent, TR_PEER_TO_CLIENT));
        }

        /* honor the session limits, if enabled */
        if (tr_torrentUsesSessionLimits(torrent) &&
            tr_sessionGetActiveSpeedLimit_Bps(torrent->session, TR_PEER_TO_CLIENT, &irate_Bps))
        {
            rate_Bps = std::min(rate_Bps, irate_Bps);
        }

        /* use this desired rate to figure out how
         * many requests we should send to this peer */
        estimatedBlocksInPeriod = (rate_Bps * seconds) / torrent->blockSize;
        msgs->desiredRequestCount = std::max(floor, estimatedBlocksInPeriod);

        /* honor the peer's maximum request count, if specified */
        if ((msgs->reqq > 0) && (msgs->desiredRequestCount > msgs->reqq))
        {
            msgs->desiredRequestCount = msgs->reqq;
        }
    }
}

static void updateMetadataRequests(tr_peerMsgsImpl* msgs, time_t now)
{
    int piece;

    if (msgs->peerSupportsMetadataXfer && tr_torrentGetNextMetadataRequest(msgs->torrent, now, &piece))
    {
        tr_variant tmp;
        struct evbuffer* payload;
        struct evbuffer* out = msgs->outMessages;

        /* build the data message */
        tr_variantInitDict(&tmp, 3);
        tr_variantDictAddInt(&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REQUEST);
        tr_variantDictAddInt(&tmp, TR_KEY_piece, piece);
        payload = tr_variantToBuf(&tmp, TR_VARIANT_FMT_BENC);

        dbgmsg(msgs, "requesting metadata piece #%d", piece);

        /* write it out as a LTEP message to our outMessages buffer */
        evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload));
        evbuffer_add_uint8(out, BT_LTEP);
        evbuffer_add_uint8(out, msgs->ut_metadata_id);
        evbuffer_add_buffer(out, payload);
        pokeBatchPeriod(msgs, HIGH_PRIORITY_INTERVAL_SECS);
        dbgOutMessageLen(msgs);

        /* cleanup */
        evbuffer_free(payload);
        tr_variantFree(&tmp);
    }
}

static void updateBlockRequests(tr_peerMsgsImpl* msgs)
{
    if (tr_torrentIsPieceTransferAllowed(msgs->torrent, TR_PEER_TO_CLIENT) && msgs->desiredRequestCount > 0 &&
        msgs->pendingReqsToPeer <= msgs->desiredRequestCount * 0.66)
    {
        TR_ASSERT(msgs->is_client_interested());
        TR_ASSERT(!msgs->is_client_choked());

        int n;
        tr_block_index_t* blocks;
        int const numwant = msgs->desiredRequestCount - msgs->pendingReqsToPeer;

        blocks = tr_new(tr_block_index_t, numwant);
        tr_peerMgrGetNextRequests(msgs->torrent, msgs, numwant, blocks, &n, false);

        for (int i = 0; i < n; ++i)
        {
            protocolSendRequest(msgs, blockToReq(msgs->torrent, blocks[i]));
        }

        tr_free(blocks);
    }
}

static size_t fillOutputBuffer(tr_peerMsgsImpl* msgs, time_t now)
{
    int piece;
    size_t bytesWritten = 0;
    struct peer_request req;
    bool const haveMessages = evbuffer_get_length(msgs->outMessages) != 0;
    bool const fext = tr_peerIoSupportsFEXT(msgs->io);

    /**
    ***  Protocol messages
    **/

    if (haveMessages && msgs->outMessagesBatchedAt == 0) /* fresh batch */
    {
        dbgmsg(msgs, "started an outMessages batch (length is %zu)", evbuffer_get_length(msgs->outMessages));
        msgs->outMessagesBatchedAt = now;
    }
    else if (haveMessages && now - msgs->outMessagesBatchedAt >= msgs->outMessagesBatchPeriod)
    {
        size_t const len = evbuffer_get_length(msgs->outMessages);
        /* flush the protocol messages */
        dbgmsg(msgs, "flushing outMessages... to %p (length is %zu)", (void*)msgs->io, len);
        tr_peerIoWriteBuf(msgs->io, msgs->outMessages, false);
        msgs->clientSentAnythingAt = now;
        msgs->outMessagesBatchedAt = 0;
        msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
        bytesWritten += len;
    }

    /**
    ***  Metadata Pieces
    **/

    if (tr_peerIoGetWriteBufferSpace(msgs->io, now) >= METADATA_PIECE_SIZE && popNextMetadataRequest(msgs, &piece))
    {
        size_t dataLen;
        bool ok = false;

        auto* data = static_cast<char*>(tr_torrentGetMetadataPiece(msgs->torrent, piece, &dataLen));

        if (data != nullptr)
        {
            tr_variant tmp;
            struct evbuffer* payload;
            struct evbuffer* out = msgs->outMessages;

            /* build the data message */
            tr_variantInitDict(&tmp, 3);
            tr_variantDictAddInt(&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_DATA);
            tr_variantDictAddInt(&tmp, TR_KEY_piece, piece);
            tr_variantDictAddInt(&tmp, TR_KEY_total_size, msgs->torrent->infoDictLength);
            payload = tr_variantToBuf(&tmp, TR_VARIANT_FMT_BENC);

            /* write it out as a LTEP message to our outMessages buffer */
            evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload) + dataLen);
            evbuffer_add_uint8(out, BT_LTEP);
            evbuffer_add_uint8(out, msgs->ut_metadata_id);
            evbuffer_add_buffer(out, payload);
            evbuffer_add(out, data, dataLen);
            pokeBatchPeriod(msgs, HIGH_PRIORITY_INTERVAL_SECS);
            dbgOutMessageLen(msgs);

            evbuffer_free(payload);
            tr_variantFree(&tmp);
            tr_free(data);

            ok = true;
        }

        if (!ok) /* send a rejection message */
        {
            tr_variant tmp;
            struct evbuffer* payload;
            struct evbuffer* out = msgs->outMessages;

            /* build the rejection message */
            tr_variantInitDict(&tmp, 2);
            tr_variantDictAddInt(&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REJECT);
            tr_variantDictAddInt(&tmp, TR_KEY_piece, piece);
            payload = tr_variantToBuf(&tmp, TR_VARIANT_FMT_BENC);

            /* write it out as a LTEP message to our outMessages buffer */
            evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload));
            evbuffer_add_uint8(out, BT_LTEP);
            evbuffer_add_uint8(out, msgs->ut_metadata_id);
            evbuffer_add_buffer(out, payload);
            pokeBatchPeriod(msgs, HIGH_PRIORITY_INTERVAL_SECS);
            dbgOutMessageLen(msgs);

            evbuffer_free(payload);
            tr_variantFree(&tmp);
        }
    }

    /**
    ***  Data Blocks
    **/

    if (tr_peerIoGetWriteBufferSpace(msgs->io, now) >= msgs->torrent->blockSize && popNextRequest(msgs, &req))
    {
        --msgs->prefetchCount;

        if (requestIsValid(msgs, &req) && tr_torrentPieceIsComplete(msgs->torrent, req.index))
        {
            bool err;
            uint32_t const msglen = 4 + 1 + 4 + 4 + req.length;
            struct evbuffer* out;
            struct evbuffer_iovec iovec[1];

            out = evbuffer_new();
            evbuffer_expand(out, msglen);

            evbuffer_add_uint32(out, sizeof(uint8_t) + 2 * sizeof(uint32_t) + req.length);
            evbuffer_add_uint8(out, BT_PIECE);
            evbuffer_add_uint32(out, req.index);
            evbuffer_add_uint32(out, req.offset);

            evbuffer_reserve_space(out, req.length, iovec, 1);
            err = tr_cacheReadBlock(
                      msgs->session->cache,
                      msgs->torrent,
                      req.index,
                      req.offset,
                      req.length,
                      static_cast<uint8_t*>(iovec[0].iov_base)) != 0;
            iovec[0].iov_len = req.length;
            evbuffer_commit_space(out, iovec, 1);

            /* check the piece if it needs checking... */
            if (!err && tr_torrentPieceNeedsCheck(msgs->torrent, req.index))
            {
                err = !tr_torrentCheckPiece(msgs->torrent, req.index);

                if (err)
                {
                    tr_torrentSetLocalError(
                        msgs->torrent,
                        _("Please Verify Local Data! Piece #%zu is corrupt."),
                        (size_t)req.index);
                }
            }

            if (err)
            {
                if (fext)
                {
                    protocolSendReject(msgs, &req);
                }
            }
            else
            {
                size_t const n = evbuffer_get_length(out);
                dbgmsg(msgs, "sending block %u:%u->%u", req.index, req.offset, req.length);
                TR_ASSERT(n == msglen);
                tr_peerIoWriteBuf(msgs->io, out, true);
                bytesWritten += n;
                msgs->clientSentAnythingAt = now;
                msgs->blocksSentToPeer.add(tr_time(), 1);
            }

            evbuffer_free(out);

            if (err)
            {
                bytesWritten = 0;
                msgs = nullptr;
            }
        }
        else if (fext) /* peer needs a reject message */
        {
            protocolSendReject(msgs, &req);
        }

        if (msgs != nullptr)
        {
            prefetchPieces(msgs);
        }
    }

    /**
    ***  Keepalive
    **/

    if (msgs != nullptr && msgs->clientSentAnythingAt != 0 && now - msgs->clientSentAnythingAt > KEEPALIVE_INTERVAL_SECS)
    {
        dbgmsg(msgs, "sending a keepalive message");
        evbuffer_add_uint32(msgs->outMessages, 0);
        pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
    }

    return bytesWritten;
}

static void peerPulse(void* vmsgs)
{
    auto* msgs = static_cast<tr_peerMsgsImpl*>(vmsgs);
    time_t const now = tr_time();

    if (tr_isPeerIo(msgs->io))
    {
        updateDesiredRequestCount(msgs);
        updateBlockRequests(msgs);
        updateMetadataRequests(msgs, now);
    }

    for (;;)
    {
        if (fillOutputBuffer(msgs, now) < 1)
        {
            break;
        }
    }
}

static void gotError([[maybe_unused]] tr_peerIo* io, short what, void* vmsgs)
{
    auto* msgs = static_cast<tr_peerMsgsImpl*>(vmsgs);

    if ((what & BEV_EVENT_TIMEOUT) != 0)
    {
        dbgmsg(msgs, "libevent got a timeout, what=%hd", what);
    }

    if ((what & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) != 0)
    {
        dbgmsg(msgs, "libevent got an error! what=%hd, errno=%d (%s)", what, errno, tr_strerror(errno));
    }

    msgs->publishError(ENOTCONN);
}

static void sendBitfield(tr_peerMsgsImpl* msgs)
{
    TR_ASSERT(tr_torrentHasMetadata(msgs->torrent));

    void* bytes;
    size_t byte_count = 0;
    struct evbuffer* out = msgs->outMessages;

    bytes = tr_torrentCreatePieceBitfield(msgs->torrent, &byte_count);
    evbuffer_add_uint32(out, sizeof(uint8_t) + byte_count);
    evbuffer_add_uint8(out, BT_BITFIELD);
    evbuffer_add(out, bytes, byte_count);
    dbgmsg(msgs, "sending bitfield... outMessage size is now %zu", evbuffer_get_length(out));
    pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);

    tr_free(bytes);
}

static void tellPeerWhatWeHave(tr_peerMsgsImpl* msgs)
{
    bool const fext = tr_peerIoSupportsFEXT(msgs->io);

    if (fext && tr_torrentHasAll(msgs->torrent))
    {
        protocolSendHaveAll(msgs);
    }
    else if (fext && tr_torrentHasNone(msgs->torrent))
    {
        protocolSendHaveNone(msgs);
    }
    else if (!tr_torrentHasNone(msgs->torrent))
    {
        sendBitfield(msgs);
    }
}

/**
***
**/

/* some peers give us error messages if we send
   more than this many peers in a single pex message
   http://wiki.theory.org/BitTorrentPeerExchangeConventions */
#define MAX_PEX_ADDED 50
#define MAX_PEX_DROPPED 50

struct PexDiffs
{
    tr_pex* added;
    tr_pex* dropped;
    tr_pex* elements;
    int addedCount;
    int droppedCount;
    int elementCount;
};

static void pexAddedCb(void const* vpex, void* userData)
{
    auto* diffs = static_cast<PexDiffs*>(userData);
    auto const* pex = static_cast<tr_pex const*>(vpex);

    if (diffs->addedCount < MAX_PEX_ADDED)
    {
        diffs->added[diffs->addedCount++] = *pex;
        diffs->elements[diffs->elementCount++] = *pex;
    }
}

static constexpr void pexDroppedCb(void const* vpex, void* userData)
{
    auto* diffs = static_cast<PexDiffs*>(userData);
    auto const* pex = static_cast<tr_pex const*>(vpex);

    if (diffs->droppedCount < MAX_PEX_DROPPED)
    {
        diffs->dropped[diffs->droppedCount++] = *pex;
    }
}

static constexpr void pexElementCb(void const* vpex, void* userData)
{
    auto* diffs = static_cast<PexDiffs*>(userData);
    auto const* pex = static_cast<tr_pex const*>(vpex);

    diffs->elements[diffs->elementCount++] = *pex;
}

using tr_set_func = void (*)(void const* element, void* userData);

/**
 * @brief find the differences and commonalities in two sorted sets
 * @param a the first set
 * @param aCount the number of elements in the set 'a'
 * @param b the second set
 * @param bCount the number of elements in the set 'b'
 * @param compare the sorting method for both sets
 * @param elementSize the sizeof the element in the two sorted sets
 * @param in_a called for items in set 'a' but not set 'b'
 * @param in_b called for items in set 'b' but not set 'a'
 * @param in_both called for items that are in both sets
 * @param userData user data passed along to in_a, in_b, and in_both
 */
static void tr_set_compare(
    void const* va,
    size_t aCount,
    void const* vb,
    size_t bCount,
    tr_voidptr_compare_func compare,
    size_t elementSize,
    tr_set_func in_a_cb,
    tr_set_func in_b_cb,
    tr_set_func in_both_cb,
    void* userData)
{
    auto* a = static_cast<uint8_t const*>(va);
    auto* b = static_cast<uint8_t const*>(vb);
    uint8_t const* aend = a + elementSize * aCount;
    uint8_t const* bend = b + elementSize * bCount;

    while (a != aend || b != bend)
    {
        if (a == aend)
        {
            (*in_b_cb)(b, userData);
            b += elementSize;
        }
        else if (b == bend)
        {
            (*in_a_cb)(a, userData);
            a += elementSize;
        }
        else
        {
            int const val = (*compare)(a, b);

            if (val == 0)
            {
                (*in_both_cb)(a, userData);
                a += elementSize;
                b += elementSize;
            }
            else if (val < 0)
            {
                (*in_a_cb)(a, userData);
                a += elementSize;
            }
            else if (val > 0)
            {
                (*in_b_cb)(b, userData);
                b += elementSize;
            }
        }
    }
}

static void sendPex(tr_peerMsgsImpl* msgs)
{
    if (msgs->peerSupportsPex && tr_torrentAllowsPex(msgs->torrent))
    {
        PexDiffs diffs;
        PexDiffs diffs6;
        tr_pex* newPex = nullptr;
        tr_pex* newPex6 = nullptr;
        int const newCount = tr_peerMgrGetPeers(msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT);
        int const newCount6 = tr_peerMgrGetPeers(msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT);

        /* build the diffs */
        diffs.added = tr_new(tr_pex, newCount);
        diffs.addedCount = 0;
        diffs.dropped = tr_new(tr_pex, msgs->pexCount);
        diffs.droppedCount = 0;
        diffs.elements = tr_new(tr_pex, newCount + msgs->pexCount);
        diffs.elementCount = 0;
        tr_set_compare(
            msgs->pex,
            msgs->pexCount,
            newPex,
            newCount,
            tr_pexCompare,
            sizeof(tr_pex),
            pexDroppedCb,
            pexAddedCb,
            pexElementCb,
            &diffs);
        diffs6.added = tr_new(tr_pex, newCount6);
        diffs6.addedCount = 0;
        diffs6.dropped = tr_new(tr_pex, msgs->pexCount6);
        diffs6.droppedCount = 0;
        diffs6.elements = tr_new(tr_pex, newCount6 + msgs->pexCount6);
        diffs6.elementCount = 0;
        tr_set_compare(
            msgs->pex6,
            msgs->pexCount6,
            newPex6,
            newCount6,
            tr_pexCompare,
            sizeof(tr_pex),
            pexDroppedCb,
            pexAddedCb,
            pexElementCb,
            &diffs6);
        dbgmsg(
            msgs,
            "pex: old peer count %d+%d, new peer count %d+%d, added %d+%d, removed %d+%d",
            msgs->pexCount,
            msgs->pexCount6,
            newCount,
            newCount6,
            diffs.addedCount,
            diffs6.addedCount,
            diffs.droppedCount,
            diffs6.droppedCount);

        if (diffs.addedCount == 0 && diffs.droppedCount == 0 && diffs6.addedCount == 0 && diffs6.droppedCount == 0)
        {
            tr_free(diffs.elements);
            tr_free(diffs6.elements);
        }
        else
        {
            tr_variant val;
            uint8_t* tmp;
            uint8_t* walk;
            struct evbuffer* payload;
            struct evbuffer* out = msgs->outMessages;

            /* update peer */
            tr_free(msgs->pex);
            msgs->pex = diffs.elements;
            msgs->pexCount = diffs.elementCount;
            tr_free(msgs->pex6);
            msgs->pex6 = diffs6.elements;
            msgs->pexCount6 = diffs6.elementCount;

            /* build the pex payload */
            tr_variantInitDict(&val, 3); /* ipv6 support: left as 3: speed vs. likelihood? */

            if (diffs.addedCount > 0)
            {
                /* "added" */
                tmp = walk = tr_new(uint8_t, diffs.addedCount * 6);

                for (int i = 0; i < diffs.addedCount; ++i)
                {
                    memcpy(walk, &diffs.added[i].addr.addr, 4);
                    walk += 4;
                    memcpy(walk, &diffs.added[i].port, 2);
                    walk += 2;
                }

                TR_ASSERT(walk - tmp == diffs.addedCount * 6);
                tr_variantDictAddRaw(&val, TR_KEY_added, tmp, walk - tmp);
                tr_free(tmp);

                /* "added.f"
                 * unset each holepunch flag because we don't support it. */
                tmp = walk = tr_new(uint8_t, diffs.addedCount);

                for (int i = 0; i < diffs.addedCount; ++i)
                {
                    *walk++ = diffs.added[i].flags & ~ADDED_F_HOLEPUNCH;
                }

                TR_ASSERT(walk - tmp == diffs.addedCount);
                tr_variantDictAddRaw(&val, TR_KEY_added_f, tmp, walk - tmp);
                tr_free(tmp);
            }

            if (diffs.droppedCount > 0)
            {
                /* "dropped" */
                tmp = walk = tr_new(uint8_t, diffs.droppedCount * 6);

                for (int i = 0; i < diffs.droppedCount; ++i)
                {
                    memcpy(walk, &diffs.dropped[i].addr.addr, 4);
                    walk += 4;
                    memcpy(walk, &diffs.dropped[i].port, 2);
                    walk += 2;
                }

                TR_ASSERT(walk - tmp == diffs.droppedCount * 6);
                tr_variantDictAddRaw(&val, TR_KEY_dropped, tmp, walk - tmp);
                tr_free(tmp);
            }

            if (diffs6.addedCount > 0)
            {
                /* "added6" */
                tmp = walk = tr_new(uint8_t, diffs6.addedCount * 18);

                for (int i = 0; i < diffs6.addedCount; ++i)
                {
                    memcpy(walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16);
                    walk += 16;
                    memcpy(walk, &diffs6.added[i].port, 2);
                    walk += 2;
                }

                TR_ASSERT(walk - tmp == diffs6.addedCount * 18);
                tr_variantDictAddRaw(&val, TR_KEY_added6, tmp, walk - tmp);
                tr_free(tmp);

                /* "added6.f"
                 * unset each holepunch flag because we don't support it. */
                tmp = walk = tr_new(uint8_t, diffs6.addedCount);

                for (int i = 0; i < diffs6.addedCount; ++i)
                {
                    *walk++ = diffs6.added[i].flags & ~ADDED_F_HOLEPUNCH;
                }

                TR_ASSERT(walk - tmp == diffs6.addedCount);
                tr_variantDictAddRaw(&val, TR_KEY_added6_f, tmp, walk - tmp);
                tr_free(tmp);
            }

            if (diffs6.droppedCount > 0)
            {
                /* "dropped6" */
                tmp = walk = tr_new(uint8_t, diffs6.droppedCount * 18);

                for (int i = 0; i < diffs6.droppedCount; ++i)
                {
                    memcpy(walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16);
                    walk += 16;
                    memcpy(walk, &diffs6.dropped[i].port, 2);
                    walk += 2;
                }

                TR_ASSERT(walk - tmp == diffs6.droppedCount * 18);
                tr_variantDictAddRaw(&val, TR_KEY_dropped6, tmp, walk - tmp);
                tr_free(tmp);
            }

            /* write the pex message */
            payload = tr_variantToBuf(&val, TR_VARIANT_FMT_BENC);
            evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload));
            evbuffer_add_uint8(out, BT_LTEP);
            evbuffer_add_uint8(out, msgs->ut_pex_id);
            evbuffer_add_buffer(out, payload);
            pokeBatchPeriod(msgs, HIGH_PRIORITY_INTERVAL_SECS);
            dbgmsg(msgs, "sending a pex message; outMessage size is now %zu", evbuffer_get_length(out));
            dbgOutMessageLen(msgs);

            evbuffer_free(payload);
            tr_variantFree(&val);
        }

        /* cleanup */
        tr_free(diffs.added);
        tr_free(diffs.dropped);
        tr_free(newPex);
        tr_free(diffs6.added);
        tr_free(diffs6.dropped);
        tr_free(newPex6);
    }
}

static void pexPulse([[maybe_unused]] evutil_socket_t fd, [[maybe_unused]] short what, void* vmsgs)
{
    auto* msgs = static_cast<tr_peerMsgsImpl*>(vmsgs);

    sendPex(msgs);

    TR_ASSERT(msgs->pex_timer);
    tr_timerAdd(msgs->pex_timer.get(), PEX_INTERVAL_SECS, 0);
}
